package drds.data_propagate.parse;

import drds.common.$;
import drds.common.Constants;
import drds.data_propagate.binlog_event.exception.TableIdNotFoundException;
import drds.data_propagate.binlog_event_filter.BinlogEventFilter;
import drds.data_propagate.common.AbstractLifeCycle;
import drds.data_propagate.driver.packets.GtidSet;
import drds.data_propagate.driver.packets.GtidSetImpl;
import drds.data_propagate.entry.Entry;
import drds.data_propagate.entry.EntryHeader;
import drds.data_propagate.entry.EntryType;
import drds.data_propagate.entry.position.BinLogEventPosition;
import drds.data_propagate.entry.position.EntryPosition;
import drds.data_propagate.entry.position.SlavePosition;
import drds.data_propagate.parse.binlog_event_position_manager.BinLogEventPositionManager;
import drds.data_propagate.parse.exception.ParseException;
import drds.data_propagate.parse.exception.PositionNotFoundException;
import drds.data_propagate.parse.multistage_coordinator.MultistageCoordinator;
import drds.data_propagate.parse.multistage_coordinator.MultistageCoordinatorImpl;
import drds.data_propagate.sink.EventListSink;
import drds.data_propagate.sink.exception.SinkException;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;


public abstract class AbstractEventParser<BinLogEvent> extends AbstractLifeCycle implements EventParser<BinLogEvent> {

    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    @Setter
    @Getter
    protected BinLogEventPositionManager binLogEventPositionManager = null;
    @Setter
    @Getter
    protected EventListSink<List<Entry>> eventListSink = null;
    @Setter
    @Getter
    protected BinlogEventFilter binlogEventFilter = null;
    @Setter
    @Getter
    protected BinlogEventFilter eventBlackFilter = null;


    @Setter
    @Getter
    protected long processingInterval = -1;
    // 认证信息
    @Setter
    @Getter
    protected volatile AuthenticationInfo runningAuthenticationInfo;
    @Setter
    @Getter
    protected String taskId;
    @Setter
    @Getter
    protected BinlogParser binLogEventParser = null;
    @Setter
    @Getter
    protected Thread parseThread = null;
    @Setter
    @Getter
    protected Thread.UncaughtExceptionHandler uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {

        public void uncaughtException(Thread t, Throwable e) {
            log.error("parse events has an error", e);
        }
    };
    @Setter
    @Getter
    protected EventList eventList;
    @Setter
    @Getter
    protected int eventListqQueueSize = 1024;
    @Setter
    @Getter
    protected AtomicBoolean needTransactionPosition = new AtomicBoolean(false);
    @Setter
    @Getter
    protected long lastEntryTime = 0L;
    @Setter
    @Getter
    protected volatile boolean heartBeatEnable = true; // 是否开启心跳检查
    @Setter
    @Getter
    protected Integer detectingIntervalInSeconds = 3; // 检测频率
    @Setter
    @Getter
    protected volatile Timer timer;
    @Setter
    @Getter
    protected TimerTask heartBeatTimerTask;
    @Setter
    @Getter
    protected Throwable exception = null;
    @Setter
    @Getter
    protected boolean gtidMode = false; // 是否是GTID模式
    @Setter
    @Getter
    protected boolean parallel = true; // 是否开启并行解析模式
    @Setter
    @Getter
    protected Integer parallelThreadSize = Runtime.getRuntime().availableProcessors() * 60 / 100; // 60%的能力跑解析,剩余部分处理网络
    @Setter
    @Getter
    protected int parallelBufferSize = 256; // 必须为2的幂
    @Setter
    @Getter
    protected MultistageCoordinator multistageCoordinator;
    @Setter
    @Getter
    protected ParserExceptionHandler parserExceptionHandler;
    @Setter
    @Getter
    protected long serverId;

    public AbstractEventParser() {
        // 初始化一下
        eventList = new EventList(new EntryListFlushCallback() {

            public void flush(List<Entry> entryList) throws InterruptedException {
                boolean ok = sink(entryList);
                if (!running) {
                    return;
                }
                if (!ok) {
                    throw new ParseException("consume failed!");
                }
                BinLogEventPosition lastTransactionEndBinLogEventPosition = buildLastTransactionEndBinLogEventPosition(entryList);
                if (lastTransactionEndBinLogEventPosition != null) { // 可能position为空
                    binLogEventPositionManager.persistBinLogEventPosition(AbstractEventParser.this.taskId, lastTransactionEndBinLogEventPosition);
                }
            }
        });
    }


    protected abstract BinlogParser buildBinlogParser();

    protected abstract Dumper buildDumper();

    protected abstract MultistageCoordinator buildMultistageCoordinator();

    protected abstract EntryPosition findStartEntryPosition(Dumper dumper) throws IOException;

    protected void preDump(Dumper dumper) {
    }

    protected boolean processTableMetaData(EntryPosition entryPosition) {
        return true;
    }

    protected void afterDump(Dumper dumper) {
    }

    public void start() {
        super.start();
        MDC.put("taskIdSequense", taskId);
        // 配置transaction bytes
        // 初始化缓冲队列
        eventList.setQueueSize(eventListqQueueSize);// 设置buffer大小
        eventList.start();
        // 构造bin log parser
        binLogEventParser = buildBinlogParser();// 初始化一下BinLogParser
        binLogEventParser.start();
        // 启动工作线程
        parseThread = new Thread(new Runnable() {

            public void run() {
                MDC.put("taskIdSequense", String.valueOf(taskId));
                Dumper dumper = null;
                while (running) {
                    try {
                        // 开始执行replication
                        // 1. 构造dumper
                        dumper = buildDumper();

                        // 2. 启动一个心跳线程
                        startHeartBeat(dumper);

                        // 3. 执行dump前的准备工作
                        preDump(dumper);

                        dumper.connect();// 链接

                        long queryServerId = dumper.queryServerId();
                        if (queryServerId != 0) {
                            serverId = queryServerId;
                        }
                        // 4. 获取最后的位置信息
                        EntryPosition startEntryPosition = findStartEntryPosition(dumper);
                        final EntryPosition entryPosition = startEntryPosition;
                        if (entryPosition == null) {
                            throw new PositionNotFoundException("can't find start readedIndex for " + taskId);
                        }
                        if (!processTableMetaData(entryPosition)) {
                            throw new ParseException("can't find init tableName meta for " + taskId
                                    + " with readedIndex : " + entryPosition);
                        }


                        // 重新链接，因为在找position过程中可能有状态，需要断开后重建
                        dumper.reconnect();

                        final Sink sink = new Sink<BinLogEvent>() {

                            private BinLogEventPosition lastBinLogEventPosition;

                            public boolean sink(BinLogEvent binLogEvent) {
                                try {
                                    Entry entry = binLogEventParser.parse(binLogEvent, false);//parseAndProfilingIfNecessary(binLogEvent, false);

                                    if (!running) {
                                        return false;
                                    }

                                    if (entry != null) {
                                        exception = null; // 有正常数据流过，清空exception
                                        eventList.add(entry);
                                        // 记录一下对应的positions
                                        this.lastBinLogEventPosition = buildLastTransactionEndBinLogEventPosition(entry);
                                        // 记录一下最后一次有数据的时间
                                        lastEntryTime = System.currentTimeMillis();
                                    }
                                    return running;
                                } catch (TableIdNotFoundException e) {
                                    throw e;
                                } catch (Throwable e) {
                                    if (e.getCause() instanceof TableIdNotFoundException) {
                                        throw (TableIdNotFoundException) e.getCause();
                                    }
                                    // 记录一下，出错的位点信息
                                    processSinkError(e, this.lastBinLogEventPosition, entryPosition.getBinlogFileName(),
                                            entryPosition.getBinlogFileOffset());
                                    throw new ParseException(e); // 继续抛出异常，让上层统一感知
                                }
                            }

                        };//new sink
                        /**
                         * 在dump的过程中进行字节转为事件
                         */
                        // 4. 开始dump数据
                        if (parallel) {
                            // build stage processor
                            multistageCoordinator = buildMultistageCoordinator();
                            if (isGtidMode()) {
                                // 判断所属instance是否启用GTID模式，是的话调用ErosaConnection中GTID对应方法dump数据
                                GtidSet gtidSet = GtidSetImpl.parse(entryPosition.getGtid());
                                ((MultistageCoordinatorImpl) multistageCoordinator).setGtidSet(gtidSet);
                                multistageCoordinator.start();
                                dumper.dump(multistageCoordinator, gtidSet);
                            } else {
                                multistageCoordinator.start();
                                if (StringUtils.isEmpty(entryPosition.getBinlogFileName()) && entryPosition.getExecuteTimestamp() != null) {
                                    dumper.dump(multistageCoordinator, entryPosition.getExecuteTimestamp());
                                } else {
                                    dumper.dump(multistageCoordinator, entryPosition.getBinlogFileName(), entryPosition.getBinlogFileOffset());
                                }
                            }
                        } else {
                            if (isGtidMode()) {
                                // 判断所属instance是否启用GTID模式，是的话调用ErosaConnection中GTID对应方法dump数据
                                dumper.dump(sink, GtidSetImpl.parse(entryPosition.getGtid()));
                            } else {
                                if (StringUtils.isEmpty(entryPosition.getBinlogFileName()) && entryPosition.getExecuteTimestamp() != null) {
                                    dumper.dump(sink, entryPosition.getExecuteTimestamp());
                                } else {
                                    dumper.dump(sink, entryPosition.getBinlogFileName(), entryPosition.getBinlogFileOffset());
                                }
                            }
                        }
                    } catch (TableIdNotFoundException e) {
                        exception = e;
                        // 特殊处理TableIdNotFound异常,出现这样的异常，一种可能就是起始的position是一个事务当中，导致tablemap
                        // Event时间没解析过
                        needTransactionPosition.compareAndSet(false, true);
                        log.error(String.format("dump address %s has an error, retrying. caused by ",
                                runningAuthenticationInfo.getInetSocketAddress().toString()), e);
                    } catch (Throwable e) {
                        log.error($.printStackTraceToString(e));
                        if (Constants.developMode) {
                            e.printStackTrace();
                        }
                        processDumpError(e);
                        exception = e;
                        if (!running) {
                            if (!(e instanceof java.nio.channels.ClosedByInterruptException
                                    || e.getCause() instanceof java.nio.channels.ClosedByInterruptException)) {
                                throw new ParseException(String.format("dump address %s has an error, retrying. ",
                                        runningAuthenticationInfo.getInetSocketAddress().toString()), e);
                            }
                        } else {
                            log.error(String.format("dump address %s has an error, retrying. caused by ",
                                    runningAuthenticationInfo.getInetSocketAddress().toString()), e);

                        }
                        if (parserExceptionHandler != null) {
                            parserExceptionHandler.handle(e);
                        }
                    } finally {
                        // 重新置为中断状态
                        Thread.interrupted();
                        // 关闭一下链接
                        afterDump(dumper);
                        try {
                            if (dumper != null) {
                                dumper.disconnect();
                            }
                        } catch (IOException e1) {
                            if (!running) {
                                throw new ParseException(String.format("disconnect address %s has an error, retrying. ",
                                        runningAuthenticationInfo.getInetSocketAddress().toString()), e1);
                            } else {
                                log.error("disconnect address {} has an error, retrying., caused by ",
                                        runningAuthenticationInfo.getInetSocketAddress().toString(), e1);
                            }
                        }
                    }
                    // 出异常了，退出sink消费，释放一下状态
                    eventListSink.interrupt();
                    eventList.reset();// 重置一下缓冲队列，重新记录数据
                    binLogEventParser.reset();// 重新置位
                    if (multistageCoordinator != null && multistageCoordinator.isStart()) {
                        // 处理 RejectedExecutionException
                        try {
                            multistageCoordinator.stop();
                        } catch (Throwable t) {
                            log.debug("multi processor rejected:", t);
                        }
                    }

                    if (running) {
                        // sleep一段时间再进行重试
                        try {
                            Thread.sleep(10000 + RandomUtils.nextInt(10000));
                        } catch (InterruptedException e) {
                        }
                    }
                }
                MDC.remove("taskIdSequense");
            }
        });

        parseThread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
        parseThread.setName(String.format("taskIdSequense = %s , address = %s , EventParser", taskId, runningAuthenticationInfo == null ? null : runningAuthenticationInfo.getInetSocketAddress()));
        parseThread.start();
    }

    public void stop() {
        super.stop();

        stopHeartBeat(); // 先停止心跳
        parseThread.interrupt(); // 尝试中断
        eventListSink.interrupt();

        if (multistageCoordinator != null && multistageCoordinator.isStart()) {
            try {
                multistageCoordinator.stop();
            } catch (Throwable t) {
                log.debug("multi processor rejected:", t);
            }
        }

        try {
            parseThread.join();// 等待其结束
        } catch (InterruptedException e) {
            // ignore
        }

        if (binLogEventParser.isStart()) {
            binLogEventParser.stop();
        }
        if (eventList.isStart()) {
            eventList.stop();
        }
    }

    protected boolean sink(List<Entry> entryList)
            throws SinkException, InterruptedException {
        boolean ok = eventListSink.sink(entryList, (runningAuthenticationInfo == null) ? null : runningAuthenticationInfo.getInetSocketAddress(), taskId);
        return ok;
    }


    protected BinLogEventPosition buildLastTransactionEndBinLogEventPosition(List<Entry> entryList) { // 初始化一下
        for (int i = entryList.size() - 1; i > 0; i--) {
            Entry entry = entryList.get(i);
            if (entry.getEntryType() == EntryType.transaction_end) {// 尽量记录一个事务做为position
                return buildLastTransactionEndBinLogEventPosition(entry);
            }
        }

        return null;
    }

    protected BinLogEventPosition buildLastTransactionEndBinLogEventPosition(Entry entry) { // 初始化一下
        BinLogEventPosition binLogEventPosition = new BinLogEventPosition();
        //
        SlavePosition slavePosition = new SlavePosition(runningAuthenticationInfo.getInetSocketAddress(), -1L);
        binLogEventPosition.setSlaveId(slavePosition);
        //
        EntryPosition entryPosition = new EntryPosition();
        entryPosition.setServerId(entry.getEntryHeader().getServerId());
        entryPosition.setGtid(entry.getEntryHeader().getGtid());
        entryPosition.setBinlogFileName(entry.getEntryHeader().getBinlogFileName());
        entryPosition.setBinlogFileOffset(entry.getEntryHeader().getBinlogFileOffset());
        entryPosition.setExecuteTimestamp(entry.getEntryHeader().getExecuteTimestamp());
        //
        binLogEventPosition.setEntryPosition(entryPosition);
        return binLogEventPosition;
    }

    protected void processSinkError(Throwable e, BinLogEventPosition lastPosition, String startBinlogFile, Long startPosition) {
        if (lastPosition != null) {
            log.warn(String.format("ERROR ## parse this binlog_event has an error , last readedIndex : [%s]",
                    lastPosition.getEntryPosition()), e);
        } else {
            log.warn(String.format("ERROR ## parse this binlog_event has an error , last readedIndex : [%s,%s]",
                    startBinlogFile, startPosition), e);
        }
    }

    protected void processDumpError(Throwable e) {
        // do nothing
    }

    protected void startHeartBeat(Dumper dumper) {
        lastEntryTime = 0L; // 初始化
        if (timer == null) {// lazy初始化一下
            String name = String.format("taskIdSequense = %s , address = %s , HeartBeatTimeTask", taskId,
                    runningAuthenticationInfo == null ? null : runningAuthenticationInfo.getInetSocketAddress().toString());
            synchronized (AbstractEventParser.class) {
                // synchronized (MysqlEventParser.class) {
                // why use MysqlEventParser.class, u know, MysqlEventParser is
                // the child class 4 AbstractEventParser,
                // do this is ...
                if (timer == null) {
                    timer = new Timer(name, true);
                }
            }
        }

        if (heartBeatTimerTask == null) {// fixed issue #56，避免重复创建heartbeat线程
            heartBeatTimerTask = buildHeartBeatTimeTask(dumper);
            Integer detectingIntervalInSeconds = this.detectingIntervalInSeconds;
            timer.schedule(heartBeatTimerTask, detectingIntervalInSeconds * 1000L, detectingIntervalInSeconds * 1000L);
            log.info("start heart beat.... ");
        }
    }

    protected TimerTask buildHeartBeatTimeTask(Dumper dumper) {
        return new TimerTask() {

            public void run() {
                try {
                    if (exception == null || lastEntryTime > 0) {
                        // 如果未出现异常，或者有第一条正常数据
                        long now = System.currentTimeMillis();
                        long inteval = (now - lastEntryTime) / 1000;
                        if (inteval >= detectingIntervalInSeconds) {
                            EntryHeader entryHeader = new EntryHeader();
                            entryHeader.setExecuteTimestamp(now);
                            Entry entry = new Entry();
                            entry.setEntryHeader(entryHeader);
                            entry.setEntryType(EntryType.heartbeat);
                            // 提交到sink中，目前不会提交到store中，会在sink中进行忽略
                            sink(Arrays.asList(entry));
                        }
                    }

                } catch (Throwable e) {
                    log.warn("heartBeat run failed ", e);
                }
            }

        };
    }

    protected void stopHeartBeat() {
        lastEntryTime = 0L; // 初始化
        if (timer != null) {
            timer.cancel();
            timer = null;
        }
        heartBeatTimerTask = null;
    }


    public void setParallelThreadSize(Integer parallelThreadSize) {
        if (parallelThreadSize != null) {
            this.parallelThreadSize = parallelThreadSize;
        }
    }

}
